{% if not is_finite %}
// Infinite stream
private final FlowableProcessor<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name }}{% endif %}> {{ name.lower_camel_case }}Processor = PublishProcessor.create();
private final Flowable<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name }}{% endif %}> {{ name.lower_camel_case }} = {{ name.lower_camel_case }}Processor.onBackpressureBuffer().share();;
private volatile boolean is{{ name.upper_camel_case }}Initialized = false;

private void process{{ name.upper_camel_case }}({% for param in params %}@NonNull {{ param.type_info.name }} {{ param.name.lower_camel_case }}{{ ", " if not loop.last }}{% endfor %}) {
  {{ plugin_name.upper_camel_case }}Proto.Subscribe{{ name.upper_camel_case }}Request request = {{ plugin_name.upper_camel_case }}Proto.Subscribe{{ name.upper_camel_case }}Request.newBuilder()
    {%- for param in params %}
      {%- if param.type_info.is_primitive %}
    .set{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }})
      {%- elif param.type_info.is_repeated %}
    .addAll{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }}.stream().map(elem -> elem.rpc{{ param.type_info.inner_name }}())::iterator)
      {%- else %}
    .set{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }}.rpc{{ param.type_info.name }}())
      {%- endif %}
    {%- endfor %}
    .build();

  stub.subscribe{{ name.upper_camel_case }}(request, new StreamObserver<{{ plugin_name.upper_camel_case }}Proto.{{ name.upper_camel_case }}Response>() {

    @Override
    public void onNext({{ plugin_name.upper_camel_case }}Proto.{{ name.upper_camel_case }}Response value) {
      {%- if return_type.is_repeated %}
        {%- if return_type.is_primitive %}
      {{ name.lower_camel_case }}Processor.onNext(value.get{{ return_name.upper_camel_case }}List());
        {%- else %}
      {{ name.lower_camel_case }}Processor.onNext(value.get{{ return_name.upper_camel_case }}List().stream().map({{ return_type.inner_name }}::translateFromRpc).collect(Collectors.toList()));
        {%- endif %}
      {%- else %}
        {%- if return_type.is_primitive %}
      {{ name.lower_camel_case }}Processor.onNext(value.get{{ return_name.upper_camel_case }}());
        {%- else %}
      {{ name.lower_camel_case }}Processor.onNext({{ return_type.name }}.translateFromRpc(value.get{{ return_name.upper_camel_case }}()));
        {%- endif %}
      {%- endif %}
    }

    @Override
    public void onError(Throwable t) {
      {{ name.lower_camel_case }}Processor.onError(t);
    }

    @Override
    public void onCompleted() {
      {{ name.lower_camel_case }}Processor.onComplete();
    }
  });
}

@CheckReturnValue
public Flowable<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name }}{% endif %}> get{{ name.upper_camel_case }}({% for param in params %}@NonNull {{ param.type_info.name }} {{ param.name.lower_camel_case }}{{ ", " if not loop.last }}{% endfor %}) {
  if (!is{{ name.upper_camel_case }}Initialized) {
    synchronized (this) {
      if (!is{{ name.upper_camel_case }}Initialized) {
        MavsdkEventQueue.executor().execute(() -> process{{ name.upper_camel_case }}({% for param in params %}{{ param.name.lower_camel_case }}{{ ", " if not loop.last }}{% endfor %}));
        is{{ name.upper_camel_case }}Initialized = true;
      }
    }
  }
  return {{ name.lower_camel_case }};
}

{% else %}
// Finite stream
private Flowable<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name }}{% endif %}> create{{ name.upper_camel_case }}({% for param in params %}{{ param.type_info.name }} {{ param.name.lower_camel_case }}{{ ", " if not loop.last }}{% endfor %}) {
  {{ plugin_name.upper_camel_case }}Proto.Subscribe{{ name.upper_camel_case }}Request request = {{ plugin_name.upper_camel_case }}Proto.Subscribe{{ name.upper_camel_case }}Request.newBuilder()
  {%- for param in params %}
    {%- if param.type_info.is_primitive %}
    .set{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }})
    {%- elif param.type_info.is_repeated %}
    .addAll{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }}.stream().map(elem -> elem.rpc{{ param.type_info.inner_name }}())::iterator)
    {%- else %}
    .set{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }}.rpc{{ param.type_info.name }}())
    {%- endif %}
  {%- endfor %}
    .build();

  Flowable<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name }}{% endif %}> flowable = Flowable.create(emitter -> {
    if (!isInitialized) {
      Thread.sleep(PLUGIN_INIT_TIMEOUT_MS);
      if (!isInitialized) {
        throw new MavsdkException("No System");
      }
    }

    stub.subscribe{{ name.upper_camel_case }}(request, new StreamObserver<{{ plugin_name.upper_camel_case }}Proto.{{ name.upper_camel_case }}Response>() {

      @Override
      public void onNext({{ plugin_name.upper_camel_case }}Proto.{{ name.upper_camel_case }}Response value) {
        {{ plugin_name.upper_camel_case }}Result result = {{ plugin_name.upper_camel_case }}Result.translateFromRpc(value.get{{ plugin_name.upper_camel_case }}Result());

        switch (result.result) {
          case SUCCESS:
            emitter.onComplete();
            break;
          case NEXT:
            {%- if return_type.is_repeated %}
              {%- if return_type.is_primitive %}
            emitter.onNext(value.get{{ return_name.upper_camel_case }}List());
              {%- else %}
            emitter.onNext(value.get{{ return_name.upper_camel_case }}List().stream().map({{ return_type.inner_name }}::translateFromRpc).collect(Collectors.toList()));
              {%- endif %}
            {%- else %}
              {%- if return_type.is_primitive %}
            emitter.onNext(value.get{{ return_name.upper_camel_case }}());
              {%- else %}
            emitter.onNext({{ return_type.name }}.translateFromRpc(value.get{{ return_name.upper_camel_case }}()));
              {%- endif %}
            {%- endif %}
            break;
          default:
            emitter.onError(new {{ plugin_name.upper_camel_case }}Exception(result.result, result.resultStr));
            break;
        }
      }

      @Override
      public void onError(Throwable t) {
        emitter.onError(t);
      }

      @Override
      public void onCompleted() {
        emitter.onComplete();
      }
    });
  }, BackpressureStrategy.BUFFER);

  return flowable.subscribeOn(Schedulers.io()).share();
}

@CheckReturnValue
public Flowable<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name }}{% endif %}> {{ name.lower_camel_case }}({% for param in params %}@NonNull {{ param.type_info.name }} {{ param.name.lower_camel_case }}{{ ", " if not loop.last }}{% endfor %}) {
  return create{{ name.upper_camel_case }}({% for param in params %}{{ param.name.lower_camel_case }}{{ ", " if not loop.last }}{% endfor %});
}

{% endif %}
